## Initialization
# Make sure mongolite prerequests are installed https://jeroen.github.io/mongolite/index.html#requirements-linux-mac

if (!"mongolite" %in% installed.packages()) install.packages("mongolite")
if (!"timelineS" %in% installed.packages()) install.packages("timelineS")
if (!"dplyr" %in% installed.packages()) install.packages("dplyr")
if (!"tidyr" %in% installed.packages()) install.packages("tidyr")
if (!"ggplot2" %in% installed.packages()) install.packages("ggplot2")
library(mongolite)
library(timelineS)
library(dplyr, warn.conflicts = FALSE)
library(tidyr)
library(ggplot2)

dbUrl = "mongodb://127.0.0.1:27017/i3ql-benchmarks"
db = mongo(url = dbUrl)

# Get available test executions

collectionNames = db$run('{"listCollections":1}')$cursor$firstBatch$name
explodedCollectionNames = strsplit(collectionNames, "\\.")

executions = c()
for (exColName in explodedCollectionNames) {
  executions = c(executions, paste(exColName[1], exColName[2], exColName[3], sep = "."))
}
executions = unique(executions)

Executions

The following executions are evaluated in this report

executions
##  [1] "company.query8.0002"  "company.query2.0002"  "company.query5.0002" 
##  [4] "company.query4.0002"  "company.query9.0002"  "company.query10.0002"
##  [7] "company.query1.0002"  "company.query6.0002"  "company.query3.0002" 
## [10] "company.query7.0002"
# Read in all data
configData = NULL
eventData = NULL
performanceData = NULL
throughputData = NULL
for (execution in executions) {
  # execution = data.frame(execution = execution)
  
  configCollection = mongo(paste(execution, "config", sep = "."), url = dbUrl)
  configData = bind_rows(configData, mutate(configCollection$find(), execution))
  
  eventCollection = mongo(paste(execution, "event", sep = "."), url = dbUrl)
  eventData = bind_rows(eventData, mutate(eventCollection$find(), execution))
  
  performanceCollection = mongo(paste(execution, "performance", sep = "."), url = dbUrl)
  performanceData = bind_rows(performanceData, mutate(performanceCollection$find(), execution))
  
  throughputCollection = mongo(paste(execution, "throughput", sep = "."), url = dbUrl)
  throughputData = bind_rows(throughputData, mutate(throughputCollection$find(), execution))
}

# Time offsets (^= measurement start) and measurement ends (^= measurement finished) per execution and node
timeOffsets = filter(eventData, grepl("^section\\.measurement\\.enter$", event)) %>% group_by(node, execution) %>% summarize(timeOffset = median(time)) %>% ungroup
# Real end might be even up to 5 seconds earlier
measurementStarted = filter(eventData, grepl("^section\\.measurement-recording\\.enter$", event)) %>% group_by(node, execution) %>% summarize(timeStarted = min(time)) %>% ungroup
measurementFinished = filter(eventData, grepl("^section\\.measurement-finished\\.enter$", event)) %>% group_by(node, execution) %>% summarize(timeFinished = median(time)) %>% ungroup
offsetStartFinish = inner_join(timeOffsets, measurementStarted, by = c("node", "execution")) %>% inner_join(measurementFinished, by = c("node", "execution"))

# Apply time offsets and measurement end to the data
eventData = left_join(eventData, offsetStartFinish, by = c("node", "execution")) %>% mutate(time = time - timeOffset) %>% arrange(time)
performanceData = left_join(performanceData, offsetStartFinish, by = c("node", "execution")) %>% filter(time >= timeStarted & time <= timeFinished) %>% mutate(time = time - timeOffset) %>% arrange(time)
throughputData = left_join(throughputData, offsetStartFinish, by = c("node", "execution")) %>% filter(time >= timeStarted & time <= timeFinished) %>% mutate(time = time - timeOffset) %>% arrange(time)

# Add relation field to throughputData
throughputData = mutate(throughputData, relation = paste(node, relationName, sep="#"))
# Bring cpu time to 0 at measurement start
performanceData = group_by(performanceData, node, execution) %>% mutate(cpuTime = cpuTime - min(cpuTime)) %>% ungroup

# Combine timespan events
eventSpanEnters = filter(eventData, grepl("\\.enter$", event)) %>% mutate(event = sub("\\.enter$", "", event)) %>% rename(enterTime = time)
eventSpanExits = filter(eventData, grepl("\\.exit$", event)) %>% mutate(event = sub("\\.exit$", "", event)) %>% rename(exitTime = time)

# Init section events data
sectionSpanData = inner_join(eventSpanEnters, eventSpanExits, by = c("node", "execution", "event", "timeOffset")) %>% mutate(duration = exitTime - enterTime) %>% filter(grepl("^section\\.", event))

# Init latency events data
latencySpanEnters = rename(eventSpanEnters, enterNode = node) %>% select(-timeOffset, -timeStarted, -timeFinished) %>% filter(grepl("^latency\\.", event))
latencySpanExits = rename(eventSpanExits, exitNode = node) %>% select(-timeOffset, -timeStarted, -timeFinished) %>% filter(grepl("^latency\\.", event))
latencyData = inner_join(latencySpanEnters, latencySpanExits, by = c("execution", "event")) %>% mutate(duration = exitTime - enterTime)
# In case of multi source events, only the latest source event is interesting, since it blocked all previous ones
latencyData = group_by(latencyData, execution, event) %>% filter(enterTime == max(enterTime)) %>% ungroup %>% mutate(trace = sub("^.*\\.", "", event))

# Init memory events data
memoryData = filter(eventData, grepl("^memory\\.", event)) %>% transmute(node, execution, measure = sub("^memory\\.", "", sub("\\.[^.]*$", "", event)), usage = as.numeric(sub("^memory\\..*\\.", "", event)))

Execution timetable

sectionSpanPlot = rename(sectionSpanData, time = enterTime) %>% mutate(benchmark = sub("\\.[^.]*\\.[^.]*$", "", execution), execution = sub("[^.]*\\.", "", execution), time = time / 1000, exitTime = exitTime / 1000)
timelineG(df=sectionSpanPlot, start="time", end="exitTime", names="node", phase="event", group1 = "execution", group2 = "benchmark")

Offset precision analysis

allBarrierPairsTwice = left_join(sectionSpanData, sectionSpanData, by = c("execution", "event")) %>% filter(node.x != node.y) %>% distinct
# allBarrierPairsTwice leads to a symmetric density, since each start time difference occurs once positive and once negative
allBarrierPairs = filter(allBarrierPairsTwice, enterTime.y >= enterTime.x)
group_by(allBarrierPairs, execution, event) %>% summarize(var = var(enterTime.y - enterTime.x), mean = mean(enterTime.y - enterTime.x), sd = sd(enterTime.y - enterTime.x)) %>% ungroup
durPlot(df = allBarrierPairs, start = "enterTime.x", end="enterTime.y", facet = TRUE, other = (facet_wrap(~execution, scales="free")), plot_type = "density", group = "execution", title = TRUE, title_density = "Section enter delay distribution")

Performance

print(ggplot(performanceData, aes(time / 1000, memory / 1024^2, colour=node, group=node)) +
        geom_line() +
        ggtitle("Memory Usage") +
        xlab("Time [s]") + 
        ylab("Memory [MB]") +
        facet_wrap(~execution, scales="free"))

print(ggplot(memoryData, aes(y=usage/1024^2, x=node, fill=measure, color=node)) +
        geom_bar(position="dodge", stat="identity") +
        ggtitle("Memory Usage") +
        xlab("Time [s]") + 
        ylab("Memory [MB]") +
        facet_wrap(~execution))

print(ggplot(performanceData, aes(time / 1000, cpuLoad, colour=node, group=node)) +
        geom_line() +
        ggtitle("CPU Load") +
        xlab("Time [s]") +
        ylab("CPU Load") +
        facet_wrap(~execution, scales="free"))

print(ggplot(performanceData, aes(time / 1000, cpuTime / 1000^3, colour=node, group=node)) +
        geom_line() +
        ggtitle("CPU Time") +
        xlab("Time [s]") +
        ylab("CPU Time [s]") +
        facet_wrap(~execution, scales="free"))

Throughput

print(ggplot(throughputData, aes(time / 1000, eventCount, colour=relation, group=relation)) +
        geom_line() +
        ggtitle("Event Count") +
        xlab("Time [s]") + 
        ylab("Number of Events processed") +
        facet_wrap(~execution, scales="free"))

print(ggplot(throughputData, aes(time / 1000, eventCount, colour=execution, group=execution)) +
        geom_line() +
        ggtitle("Event Count") +
        xlab("Time [s]") + 
        ylab("Number of Events processed") +
        facet_wrap(~relation, scales="free"))

eventRateData = group_by(throughputData, execution, relation) %>% mutate(timeSpan = time - lag(time), intervalEventCount = eventCount - lag(eventCount)) %>% filter(!is.na(timeSpan))
print(ggplot(eventRateData, aes(time / 1000, intervalEventCount / (timeSpan / 1000),  colour=relation, group=relation)) +
        geom_line() +
        ggtitle("Event Processing Rate") +
        xlab("Time [s]") + 
        ylab("Event Processing Rate [/s]") +
        facet_wrap(~execution, scales="free"))

print(ggplot(eventRateData, aes(time / 1000, intervalEventCount / (timeSpan / 1000),  colour=execution, group=execution)) +
        geom_line() +
        ggtitle("Event Processing Rate") +
        xlab("Time [s]") + 
        ylab("Event Processing Rate [/s]") +
        facet_wrap(~relation, scales="free"))

print(ggplot(throughputData, aes(time / 1000, entryCount, colour=relation, group=relation)) +
        geom_line() +
        ggtitle("Entry Count") +
        xlab("Time [s]") + 
        ylab("Number of Entries in the Relation") +
        facet_wrap(~execution, scales="free"))

print(ggplot(throughputData, aes(time / 1000, entryCount, colour=execution, group=execution)) +
        geom_line() +
        ggtitle("Entry Count") +
        xlab("Time [s]") + 
        ylab("Number of Entries in the Relation") +
        facet_wrap(~relation, scales="free"))

entryRateData = group_by(throughputData, execution, relation) %>% mutate(timeSpan = time - lag(time), intervalEntryCount = entryCount - lag(entryCount)) %>% filter(!is.na(timeSpan))
print(ggplot(entryRateData, aes(time / 1000, intervalEntryCount / (timeSpan / 1000),  colour=relation, group=relation)) +
        geom_line() +
        ggtitle("Entry Change Rate") +
        xlab("Time [s]") + 
        ylab("Entry Change Rate [/s]") +
        facet_wrap(~execution, scales="free"))

print(ggplot(entryRateData, aes(time / 1000, intervalEntryCount / (timeSpan / 1000),  colour=execution, group=execution)) +
        geom_line() +
        ggtitle("Entry Change Rate") +
        xlab("Time [s]") + 
        ylab("Entry Change Rate [/s]") +
        facet_wrap(~relation, scales="free"))

Latency

print(ggplot(latencyData, aes(exitTime / 1000, duration,  colour=trace, group=trace)) +
        geom_point() +
        geom_line() +
        ggtitle("Latency over Time") +
        xlab("Result Time [s]") + 
        ylab("Latency [ms]") +
        facet_wrap(~execution, scales="free"))

durPlot(df = latencyData, start = "enterTime", end="exitTime", facet = TRUE, other = (facet_wrap(~execution, scales="free")), plot_type = "density", group = "execution", title = TRUE, title_density = "Latency Distribution")
print(ggplot(latencyData, aes(duration, colour=trace, group=trace)) +
        geom_histogram(aes(y=..density..), binwidth=20) +
        geom_density() +
        geom_vline(aes(xintercept=mean(duration / 1000))) +
        ggtitle("Latency Distribution") +
        xlab("Latency [ms]") + 
        ylab("") +
        facet_wrap(~execution, scales="free"))

group_by(latencyData, execution) %>% summarize(var = var(duration), mean = mean(duration), sd = sd(duration)) %>% ungroup

Configuration

configData